package com.stay4it.rxjava;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.stay4it.rxjava.bean.Student;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class RxJava02 {
	
	/**
	 *  Java中的Runnable、Callable、Future、FutureTask的区别
	 * @author lzx
	 * 参考博客：http://blog.csdn.net/jdsjlzx/article/details/52912701
	 */

	public static class Task implements Runnable {

        @Override
        public void run() {
            System.out.println("run");
        }

    }
    public static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return 20;
        }

    }
	
	static ExecutorService executorService = Executors.newCachedThreadPool();
	
	static List<String> list = Arrays.asList("小王","小李","赵四");
	static String[] array = new String[]{"Stay","谷歌小弟","Star"};

	/**
	 * from(Future<? extends T> future)
	 * 了解Future参见：http://blog.csdn.net/jdsjlzx/article/details/52912701
	 */
	private static void test1(){
		Future<Integer> future = executorService.submit(new CallableTask());
		
		Observable.from(future)
		.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}
			
			@Override
			public void onNext(Integer value) {
				System.out.println("onSuccess value = " + value);
			}

			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		executorService.shutdown();
	}
	
	/**
	 * from(Iterable<? extends T> iterable)
	 */
	private static void test2(){
		Observable.from(list)
		.subscribe(new Subscriber<String>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("onError error = " + e);				
			}

			@Override
			public void onNext(String t) {
				// TODO Auto-generated method stub
				System.out.println("onNext value = " + t);
			}
			
		});
		
	}
	

	/**
	 * from(T[] array)
	 */
	private static void test3(){
		Observable.from(array)
		.subscribe(new Subscriber<String>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("onError error = " + e);				
			}

			@Override
			public void onNext(String t) {
				// TODO Auto-generated method stub
				System.out.println("onNext value = " + t);
			}
			
		});
		
	}
	
	/**
	 * from(Future<? extends T> future, Scheduler scheduler)
	 */
	private static void test4(){
		Future<Integer> future = executorService.submit(new CallableTask());
		
		Observable.from(future, Schedulers.computation())
		.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("onError error = " + e);				
			}

			@Override
			public void onNext(Integer t) {
				// TODO Auto-generated method stub
				System.out.println("onNext value = " + t);
			}
			
		});
		
	}
	
	/**
	 * timeout:超时时间
	 * from(Future<? extends T> future, long timeout, TimeUnit unit)
	 */
	private static void test5(){
		Future<Integer> future = executorService.submit(new CallableTask());
		
		Observable.from(future, 2,TimeUnit.SECONDS)
		.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("onError error = " + e);				
			}

			@Override
			public void onNext(Integer t) {
				System.out.println("onNext value = " + t);
			}
			
		});
		
	}
	
	public static void main(String[] args) {
		test5();
	}
	
	
}
